Amazon SNS を使用したタスクのエラー通知を有効化する #SnowflakeDB
はじめに
Snowflake のタスクでは、実行中にエラーが発生した場合、各クラウドサービスのメッセージング サービスにエラー通知をプッシュできます。こちらの構成を行ってみたので手順をまとめます。
タスクのエラー通知の概要
Snowflake ではタスクに通知統合を割り当てることで、エラー発生時に通知をプッシュすることが可能です。この際、以下の各クラウドサービスのメッセージング サービスを利用できます
- Amazon Simple Notification Service (SNS)
- Microsoft Azure Event Grid
- Google Pub/Sub
注意点として、執筆時点ではクロスクラウド サポートはプッシュ通知では利用できないため、Snowflake アカウントがホストされているクラウド プラットフォームのメッセージング サービスに対してエラー通知を構成する必要があります。
その他の特徴は以下です。
- タスク エラー通知機能は、サーバーレス タスクとユーザー管理タスクの両方でサポートされる
- タスクの DAG を構成している場合は、ルートタスクにのみエラー通知を割り当て可能
- DAG を構成するいずれかのタスクが失敗したときに通知される
Amazon SNS を使用したエラー通知の構成
ここではメッセージング サービスとして Amazon SNS を使用したエラー通知の構成を行います。公式ドキュメントとしては以下に記載あるので、こちらに沿って進めます。なお、特に言及の無い項目はデフォルト設定として進めています。
STEP1:Amazon SNS トピックの作成
SNS トピックの作成に関するドキュメントは以下です。
トピックの作成
AWS アカウントにログインしトピックを作成します。
はじめの注意点として、トピックのタイプとして「スタンダード」を指定します。Snowflake 側のドキュメントにも記載がありますが、執筆時点ではスタンダードタイプのみサポートされています。
ここでは下図の設定としました。後述しますが、SNS トピックの名称をタスクの内容がわかるものにしておくことで、通知が来た際にどのタスクのエラーであるかを特定しやすくなります。
サブスクリプションの作成
トピックを作成したら、配信先としてサブスクリプションを作成します。
ここでは「Eメール」を使用し、エンドポイントにエラー通知先として使用する想定のメールアドレスを入力しました。
サブスクリプション作成後、エンドポイントに指定したメールアドレス宛に確認のメールが届くので [Confirm subscrtiption] をクリックします。
下図の表示になります。
STEP2:IAM ポリシーを作成する
作成した SNS トピックに対して「Publish」アクションを許可する IAMポリシーを定義します。ポリシーの内容は以下とし、この際<sns_topic_arn>
の値を作成したトピックの ARN に置き換えます。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sns:Publish"
],
"Resource": "<sns_topic_arn>"
}
]
}
STEP3:AWS IAM ロールを作成する
SNS トピックへの権限を割り当てる IAM ロールを以下の設定で作成します。
- 信頼できるエンティティタイプ:別の AWS アカウント
- 外部IDの要求:有効化
別の AWS アカウントID と外部 ID は後ほど Snowflake 側で取得する値に置き換えるので、仮の値を入力しておきます。
STEP4:通知統合を作成する
Snowflake 側で通知統合を作成します。通知統合はアカウントレベルのオブジェクトのため、デフォルトでは ACCOUNTADMIN 権限でのみ作成可能です。
Amazon SNS に対する通知統合は以下のコマンドで作成できます。
CREATE NOTIFICATION INTEGRATION my_notification_int
ENABLED = TRUE
DIRECTION = OUTBOUND
TYPE = QUEUE
NOTIFICATION_PROVIDER = AWS_SNS
AWS_SNS_TOPIC_ARN = 'arn:aws:sns:us-east-2:111122223333:sns_topic'
AWS_SNS_ROLE_ARN = 'arn:aws:iam::111122223333:role/error_sns_role';
この際、以下の項目はそれぞれ自身の環境の値に置き換えます。
AWS_SNS_TOPIC_ARN
- STEP1 で作成したトピックの ARN に置き換え
AWS_SNS_ROLE_ARN
- STEP3 で作成した IAMロールの ARN に置き換え
STEP5:SNS トピックにSnowflakeアクセスを許可する
IAM ユーザー ARN と SNS トピック外部 ID の取得
作成した通知統合に対して以下のコマンドを実行します。
DESC NOTIFICATION INTEGRATION my_notification_int;
出力は下図のようになります。
後述する手順で使用する以下の値をメモしておきます。
- SF_AWS_IAM_USER_ARN
- SF_AWS_EXTERNAL_ID
IAMロールの信頼関係を変更する
上記の値で STEP2 で作成した IAM ロールの信頼関係を更新します。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"AWS": "<SF_AWS_IAM_USER_ARNに置き換え>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "<SF_AWS_EXTERNAL_IDに置き換え>"
}
}
}
]
}
STEP6:タスクでエラー通知を有効にする
サンプルとして以下のタスクを用意します。エラー通知を有効化する際はERROR_INTEGRATION
に作成した通知統合を指定します。
なお、通知統合を指定するタスクを作成するには、通知統合に対する USAGE 権限が必要です。
CREATE OR REPLACE TASK failed_task
SCHEDULE = '5 MINUTE'
ERROR_INTEGRATION = my_notification_int
AS
SELECT * FROM non_existent_table;
;
作成済みのタスクの場合は、以下で更新可能です。
ALTER TASK <name> SET ERROR_INTEGRATION = <integration_name>;
手動で任意のタイミングでタスクを実行する際は以下のコマンドを使用できます。
EXECUTE TASK failed_task;
このタスクは失敗する内容なので、実行後すぐにSNSトピックで指定したメールアドレス宛に下図のメールが届きました。
{
"version":"1.0",
"messageId":"5f6fc753-0f37-48d0-9a16-f3663d10b48c",
"messageType":"USER_TASK_FAILED",
"timestamp":"2024-08-19T01:14:32.005Z",
"accountName":"GE39660",
"taskName":"TASK_TEST.PUBLIC.FAILED_TASK",
"taskId":"01b67182-120e-f5f5-0000-000000000039",
"rootTaskName":"TASK_TEST.PUBLIC.FAILED_TASK",
"rootTaskId":"01b67182-120e-f5f5-0000-000000000039",
"messages":
[
{
"runId":"2024-08-19T01:14:31.473Z",
"scheduledTime":"2024-08-19T01:14:31.473Z",
"queryStartTime":"2024-08-19T01:14:31.866Z",
"completedTime":"2024-08-19T01:14:31.979Z",
"queryId":"01b6718a-0001-9f6c-0001-b9ce0003f112",
"errorCode":"002003",
"errorMessage":"SQL compilation error:\nObject 'NON_EXISTENT_TABLE' does not exist or not authorized."
}
]
}
「my-snowflake-topic」は、SNS のトピック名です。値からタスク名やエラー内容を確認することは可能ですが、特定を容易にするには、トピックの名称からどのタスクかわかるようにしておきます。
ルート以外のタスクに割り当てようとした場合
タスクの DAG を構成している場合は、ルートタスクにのみエラー通知を割り当て可能です。そのため、子タスクに割り当てようとすると以下の通りエラーとなります。これはファイナライザータスクも同様です。
CREATE or replace TASK mytask2
ERROR_INTEGRATION = my_notification_int
AFTER failed_task
AS
select 1;
Triggered Tasks への割り当て
エラー通知の構成は Triggered Task に対しても可能です。
CREATE OR REPLACE TABLE source_table (
id INT,
value STRING,
created_at TIMESTAMP
);
CREATE OR REPLACE STREAM source_table_stream ON TABLE source_table;
-- 空のテーブルを作成
CREATE OR REPLACE TABLE processed_table LIKE source_table;
CREATE OR REPLACE TASK my_triggered_task
ERROR_INTEGRATION = my_notification_int
WAREHOUSE = COMPUTE_WH
WHEN system$stream_has_data('source_table_stream')
AS
-- 存在しないカラムを参照することでエラーを発生させる
INSERT INTO processed_table (id, value, created_at, non_existent_column)
SELECT id, value, created_at, non_existent_column FROM source_table_stream;
;
--タスクを開始
ALTER TASK my_triggered_task RESUME;
INSERT INTO source_table (id, value, created_at) VALUES (1, 'test1', CURRENT_TIMESTAMP);
この場合もタスクが自動的に停止するまで(デフォルトでは連続して10回失敗するまで)各失敗のエラー通知が続けて届きます。
通知履歴の確認
通知の履歴はNOTIFICATION_HISTORY
テーブル関数で確認可能です。
SELECT * FROM TABLE(INFORMATION_SCHEMA.NOTIFICATION_HISTORY());
[MESSAGE_SOURCE] カラムの値がTASK
となっている場合、この通知がタスクのエラー通知によることを表しています。その他に Snowpipe のエラー通知やSYSTEM$SEND_EMAIL
や SYSTEM$SEND_SNOWFLAKE_NOTIFICATION
ストアドプロシージャによる通知履歴もここで確認できます。
さいごに
Amazon SNS を使用したタスクのエラー通知を構成してみました。ここでは簡単にEメールをエンドポイントとしましたが、Lambda と組み合わせることで Slack 等への通知も可能です。
こちらの内容が何かの参考になれば幸いです。